-
Notifications
You must be signed in to change notification settings - Fork 1.4k
feat(iceberg): Add bucket function #13174
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(iceberg): Add bucket function #13174
Conversation
✅ Deploy Preview for meta-velox canceled.
|
|
Can you help review? Thanks! @rui-mo @zhli1142015 |
rui-mo
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! Added two initial comments.
|
@jinchengchenghh Is there a GitHub issue that provides overall context for this work? If not, would it be possible to create one and explain what are we trying to achive? |
|
I create an issue #13980 just recently @mbasmanova |
velox/functions/iceberg/util/tests/Murmur3_32HashFunctionTest.cpp
Outdated
Show resolved
Hide resolved
ea854c8 to
bcd0630
Compare
|
Could you help review this PR? Thanks! @rui-mo |
mbasmanova
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jinchengchenghh Let's split this PR into one that adds hash functions and the one that adds BucketFunction.
| template <typename TInput> | ||
| FOLLY_ALWAYS_INLINE Status | ||
| call(int32_t& out, const int32_t& numBuckets, const TInput& input) { | ||
| VELOX_RETURN_IF( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add some more macros to reduce boiler plate?
We have a throwing version:
VELOX_USER_CHECK_GT(numBuckets, 0, "Invalid number of buckets")
Perhaps, we could add non-throwing version:
VELOX_USER_RETURN_GT(numBuckets, 0, "Invalid number of buckets")
rui-mo
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.
yingsu00
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mbasmanova @jinchengchenghh
Murmur3 is the hash function used by Iceberg's partition transform bucket(c,n). Its role is analagous to the hashXXX() functions in HivePartitionFunction. In general, a PartitionFunction for a specific connector is used in the following usage Scenarios:
-
Data Exchange in Queries (e.g., joins or aggregates):
- It determines which reducer/task a row should go to using PartitionFunction::partition(), based on the hash of the partition keys. The hash algorithm is different from connector to connector. E.g. Iceberg uses Murmur3, while Hive uses Java's hashCode() function. In most cases it's just simple bit shift and xor.
- The bucket function ensures bucketed joins align buckets from both tables. Hive bucket function is usually just the mod of the hash value: bucket = hash(key) % num_buckets. This could result in a negative value, while Iceberg uses consitent bucket function that gurantees a positive value.
-
File Writing:
- When inserting into a partitioned table, Hive computes the partition number using HivePartitionFunction::partition() based on the algorithm mentioned above.
- It maps a row’s to a bucket using the connector-specific bucket function based on the bucket calculation method mentioned above.
-
Connector Specific Functions
In Iceberg, users can directly query the partition functions, or use them in the filters. E.g.SELECT * FROM t WHERE bucket(c,4) = 3, orSELECT day('2002-01-01'). Note that these functions are Iceberg specific and have their unique semantics. For example, DAY(dt) is the number of days from 1970-01-01, while Presto default DAY(dt) is just the day in that particular month.
THis PR is only touching the 3rd case. However in a more holistic view, we will need to add IcebergPartitionFunction shortly. If we want to keep it aligned with Hive, we may want to put the murmur3 hash function and bucket function in IcebergPartitionFunction (HivePartitionFunction keeps the hash algorithm implementations for HIve). Then when we are registering the iceberg bucket(c,n) function, it can call the hash algorithm implementations from IcebergPartitionFunction. As an alternative, if we want to put all hash algorithms in a central place, we may need to refactor HivePartitionFunction and move all its hash implementation there too. That way we can keep the Hive and Iceberg implementations consistent, and meke the repo more manageable and readable.
I also wonder how we should set up the framework for connector specific functions. E.g. consider catalog validation (ensures these functions run on the correct catalogs, i.e. a user should not be able to run an Iceberg function on Hive tables). Also shall it be in a different "iceberg" namespace? We'd love to hear more suggestions.
|
@yingsu00 Ying, thank you for sharing detailed context. This is very helpful.
I like this option. It would be helpful to hide the details of the hash computation inside these partition functions and avoid creating classes with duplicate widely-known names (Murmur3) and slightly different implementations.
We do not have namespaces in Velox. It might be helpful to add these at some point. In the meantime, we recommend using prefixes.
Why not? |
|
For Data Exchange in Queries, in Gluten, it calls the function as other hive functions in the same way, Gluten does not call Velox Exchange. |
@mbasmanova Thanks for your information. In my understanding, different table format specifications(Hive, Iceberg, etc) support different functions. They differ in the following aspects:
So my understanding is:
To avoid collisions, it would be ideal for Velox to adopt function namespaces so that, for example, hive.day() and iceberg.day() can coexist unambiguously. Hope this can be supported in the future. |
312015f to
324bee7
Compare
|
Could you help review again? Thanks! @mbasmanova @rui-mo |
rui-mo
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Added some nits.
|
Do you have further comments? @rui-mo |
| velox_exec_test_lib | ||
| velox_expression | ||
| velox_memory | ||
| velox_dwio_common_test_utils |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious, why is this dependency needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed, thanks!
9b2c5f9 to
e1fdd29
Compare
e1fdd29 to
dfcd4a5
Compare
|
Do you have further comments? Thanks! @mbasmanova |
|
Could you help merge this one? Thanks! @Yuhta @mbasmanova |
|
@jinchengchenghh I see that Jimmy is working on merging this PR. Should land soon. |
Summary: The implementation aligns with https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/transforms/Bucket.java And described in Iceberg document https://iceberg.apache.org/spec/#partition-transforms Resolves: facebookincubator#13980 Pull Request resolved: facebookincubator#13174 Reviewed By: kKPulla Differential Revision: D81621760 Pulled By: Yuhta fbshipit-source-id: a8051cbb2676a8db0fef95e41c5858004941b7ce
|
|
||
| template <typename T> | ||
| FOLLY_ALWAYS_INLINE Status call(int32_t& out, int32_t numBuckets, T input) { | ||
| VELOX_USER_RETURN_LE(numBuckets, 0, "Invalid number of buckets."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: return Status::UserError("Invalid number of buckets");
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the new API VELOX_USER_RETURN_LE, it returns UserError inside the API
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see.
The implementation aligns with https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/transforms/Bucket.java
And described in Iceberg document https://iceberg.apache.org/spec/#partition-transforms
Resolves: #13980